BigQueryのテーブルをParquet出力する(Python / Apache Beam / Dataflow)
良い子のみなさーん!列指向でスキーマ情報も柔軟に変更できる万能なファイルフォーマットと言えばー?
せーの!?
「「「Parquet!!」」」
そんな男は黙ってApache Parquetな皆さんに向けて、BigQueryテーブルのParquet出力方法をご紹介していきたいと思います。
ちなみにですが、BigQuery自体にはなんとParquet出力のオプションがありません。2020年12月現在は、CSV、JSON、Avroのみです。Avroもリッチなファイルフォーマットですが、行指向のためそのまま分析業務に使用するのは不向きです。
BigQueryのテーブルをParquet出力するには、Google Cloud Dataflowを使用する必要があるのですが、その実装が結構骨折れるんですよね…。
Google Cloud DataflowとApache Beam
Dataflowは、データ処理・バッチ処理を得意としたサーバレスコンピューティングサービスです。バッチ処理において割と頭を使うプロビジョニングやスケーリングを自動化してくれます。
Dataflowの一番の特徴といえば、オープンソースのApache Beamをエンジンに採用している点でしょうか。Apache Beamはバッチ・ストリーミング処理のための統合プログラミングモデルで、大規模なデータ処理パイプラインをシンプルに記述することができます。
※出典:Apache Flink: Apache Beam: How Beam Runs on Top of Flink
興味深い話、Googleは過去OSSコミュニティとのイノベーションの共有に関して冷淡な姿勢を取っていたそうですが、このDataflowを開発する際にApache Beamを共同で立ち上げ、イノベーションを共有していく方向へ転換したそうです。
Apache BeamのコンセプトはDataflowの公式ドキュメントにまとめられているので、興味ある方は見てみてください。
今回はデータ変換処理を行わず、テーブルをParquet形式でエクスポートするだけなので、Apache Beamを深く理解していなくても実装可能です。
事前準備
- 環境
- Python 3.8.6
- google-cloud-bigquery==2.4.0
- google-apitools==0.5.31
- pyarrow==1.0.1
- apache-beam==2.24.0
このPythonライブラリのバージョン依存関係が結構シビアなんですよね。上に加えてpandas
も使おうとすると更に競合関係が増えるかと思います。pip install
で以下のようなErrorが出ても、動けばOKと思っておいてください。
apache-beam 2.24.0 requires pyarrow<0.18.0,>=0.15.1; python_version >= "3.0" or platform_system != "Windows", but you'll have pyarrow 1.0.1 which is incompatible.
BigQuery側にも適当にテーブルとデータを用意しておきます。まずはデータセットを作成します。今回はデータセット名はtmp
で、ロケーションはUSにしておきました。
その後以下のクエリを発行して、BigQuery公式パブリックデータのwikipedia
から100件だけ抽出してテーブルを作成します。
CREATE TABLE tmp.test_arquet_export AS SELECT * FROM bigquery-public-data.samples.wikipedia LIMIT 100;
このtmp.test_parquet_export
のデータをParquet出力していきます。Parquetの出力には、Dataflowで使用するGCSのバケットが必要なので、test_parquet_export
という名前で作成しました。設定は全部デフォルトです。
続いて、使用するプロジェクトのBigQuery APIとDataflow APIを有効にしておきます。コンソールのAPI & Services
から、ENABLE API AND SERVICES
をクリックし、BigQuery APIとDataflow APIを追加してください。
最後に、権限の設定を行います。以下の権限を追加すればOKです。Dataflowを動かすための権限数って結構多いんですよねー。
使用するPythonコード
作成したPythonコードは以下の通りです。
# bq_parquet_export.py import pyarrow as pa import apache_beam as beam from google.cloud import bigquery from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import WorkerOptions class DataFlowOptions(): def __init__(self, project_id, service_account_email, job_name, staging_location, temp_location): self.options = PipelineOptions() self.options.view_as(GoogleCloudOptions).project = project_id self.options.view_as(GoogleCloudOptions).job_name = job_name self.options.view_as(GoogleCloudOptions).staging_location = staging_location self.options.view_as(GoogleCloudOptions).temp_location = temp_location self.options.view_as(GoogleCloudOptions).region = 'us-central1' self.options.view_as(GoogleCloudOptions).service_account_email = service_account_email self.options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED' # スループットに合わせて自動でスケーリング self.options.view_as(StandardOptions).runner = 'DataflowRunner' # データフローのみでの実行を想定 class BqClient(bigquery.Client): def __init__(self, project_id, service_account_email): self.project_id = project_id self.service_account_email = service_account_email super().__init__(self.project_id) def unload_table_as_parquet(self, table_id, job_name, gs_bucket, gs_staging, gs_temp, gs_output): staging_location = 'gs://{}/{}'.format(gs_bucket, gs_staging) temp_location = 'gs://{}/{}'.format(gs_bucket, gs_temp) output_location = 'gs://{}/{}'.format(gs_bucket, gs_output) query = "SELECT * FROM {}".format(table_id) print('Start unload from {}'.format(table_id)) print(' to {}'.format(output_location)) # BQのデータ型 → Parquetのデータ型にマッピング data_type_mapping = { 'STRING': pa.string(), 'BYTES': pa.string(), 'INTEGER': pa.int64(), 'NUMERIC': pa.decimal128(18, 2), # fixed_len_byte_array化を避けるための18,2 'FLOAT': pa.float64(), 'BOOLEAN': pa.bool_(), 'TIMESTAMP': pa.timestamp(unit='s'), 'DATE': pa.date64(), 'DATETIME': pa.timestamp(unit='s'), #'ARRAY': pa.list_(), #'STRUCT': pa.struct() } table = self.get_table(table_id) print([(f.name, f.field_type) for f in table.schema]) parquet_schema = pa.schema( [pa.field(f.name, data_type_mapping[f.field_type]) for f in table.schema]) # オプションの設定 options = DataFlowOptions(self.project_id, self.service_account_email, job_name, staging_location, temp_location) p = beam.Pipeline(options=options.options) # DataFlowのPipelineを定義 # Apache Beamは記法がシェルっぽく独特 ( p | 'Input: ReadTable' >> beam.io.gcp.bigquery.ReadFromBigQuery( query=query, use_standard_sql=True) | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet( file_path_prefix=output_location, schema=parquet_schema, file_name_suffix='.parquet') ) print('Run pipeline') print('Query: {}'.format(query)) result = p.run() result.wait_until_finish() print('Pipeline successfully finished') if __name__ == '__main__': table_id = 'tmp.test_parquet_export' job_name = 'test-parquet-export' gs_bucket = 'test_parquet_export' gs_staging = 'dataflow/staging/' gs_temp = 'dataflow/temp/' gs_output = 'dataflow/output/' bq = BqClient(project_id='haruta-takumi', service_account_email='**************@haruta-takumi.iam.gserviceaccount.com') bq.unload_table_as_parquet(table_id, job_name, gs_bucket, gs_staging, gs_temp, gs_output)
解説は後回しとしまして、とりあえず実行してみます。if __name__ == '__main__':
内の各種変数を設定してもらえれば、python bq_parquet_export.py
でデフォルトのサービスアカウントのPathから認証情報を自動で取得してDataflowワーカーが起動します。Dataflowのコンソールからログの確認も可能です。
出力ファイルは指定したGCSバケットに配置されます。ダウンロードして中身を確認してみると、ちゃんとParquetで出力されていることが確認できます。
$ parquet-tools head dataflow_output_-00000-of-00001.parquet title = Cricket id = 25675557 language = wp_namespace = 0 revision_id = 171533936 contributor_ip = 128.36.84.151 timestamp = 1195079595 num_characters = 56665 title = Sepp Herberger id = 1465240 language = wp_namespace = 0 revision_id = 58775695 contributor_ip = 132.187.253.19 timestamp = 1150385515 num_characters = 1295 title = Necronomicon (H. R. Giger) id = 10972018 language = wp_namespace = 0 revision_id = 209477280 contributor_ip = 87.41.57.166 timestamp = 1209657004 num_characters = 1490 title = Daisy Meadows id = 8627561 language = wp_namespace = 0 revision_id = 247988524 contributor_ip = 86.156.164.72 timestamp = 1225118943 num_characters = 3271 title = Angela Bassett id = 508609 language = wp_namespace = 0 revision_id = 156106750 contributor_ip = 209.116.125.226 timestamp = 1189105044 num_characters = 12867
デモが見れたところで、コードの構成を解説していきます。
コード解説
使用するライブラリは、先ほどご紹介した通り、pyarrow
、apache_beam
、bigquery
の3つです。pyarrow
はParquet出力、apache_beam
はDataFlowの制御、bigquery
はテーブルの読み込みに使用しています。
Apache BeamでDataflowを起動する場合は、かなりの数のオプションの指定が必要です。今回はクラスのインスタンス変数としてまとめてしまいました。Parquet出力用の関数は、bigquery.Clientを継承したBqClient
クラスに定義していきます。
import pyarrow as pa import apache_beam as beam from google.cloud import bigquery from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import WorkerOptions class DataFlowOptions(): def __init__(self, project_id, service_account_email, job_name, staging_location, temp_location): self.options = PipelineOptions() self.options.view_as(GoogleCloudOptions).project = project_id self.options.view_as(GoogleCloudOptions).job_name = job_name self.options.view_as(GoogleCloudOptions).staging_location = staging_location self.options.view_as(GoogleCloudOptions).temp_location = temp_location self.options.view_as(GoogleCloudOptions).region = 'us-central1' self.options.view_as(GoogleCloudOptions).service_account_email = service_account_email self.options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED' # スループットに合わせて自動でスケーリング self.options.view_as(StandardOptions).runner = 'DataflowRunner' # データフローのみでの実行を想定 class BqClient(bigquery.Client): def __init__(self, project_id, service_account_email): self.project_id = project_id self.service_account_email = service_account_email super().__init__(self.project_id)
Parquet出力用のメソッドです。前半部分ではDataflowで使用するGCSのパスの指定や、SQLクエリの作成を行っています。その後、BigQueryのカラム型をParquetのカラム型に変換するためのマッパーを作成します。この調節が結構大変なポイントですね…。使用できる型がpyarrow
のバージョンにかなり依存しているので、実際に導入される際は、Apache Arrowの仕様、Parquetの仕様、BigQueryの仕様の3つをよく照らし合わせてください。
- Data Types and Schemas — Apache Arrow v2.0.0
- parquet-format/LogicalTypes.md at master · apache/parquet-format
- 標準 SQL のデータ型 | BigQuery | Google Cloud
BqClientのget_tableで出力するテーブルのスキーマ情報を取得し、Parquet用のカラム型に変換していきます。
def unload_table_as_parquet(self, table_id, job_name, gs_bucket, gs_staging, gs_temp, gs_output): staging_location = 'gs://{}/{}'.format(gs_bucket, gs_staging) temp_location = 'gs://{}/{}'.format(gs_bucket, gs_temp) output_location = 'gs://{}/{}'.format(gs_bucket, gs_output) query = "SELECT * FROM {}".format(table_id) print('Start unload from {}'.format(table_id)) print(' to {}'.format(output_location)) # BQのデータ型 → Parquetのデータ型にマッピング data_type_mapping = { 'STRING': pa.string(), 'BYTES': pa.string(), 'INTEGER': pa.int64(), 'NUMERIC': pa.decimal128(18, 2), # fixed_len_byte_array化を避けるための18,2 'FLOAT': pa.float64(), 'BOOLEAN': pa.bool_(), 'TIMESTAMP': pa.timestamp(unit='s'), 'DATE': pa.date64(), 'DATETIME': pa.timestamp(unit='s'), #'ARRAY': pa.list_(), #'STRUCT': pa.struct() } table = self.get_table(table_id) print([(f.name, f.field_type) for f in table.schema]) parquet_schema = pa.schema( [pa.field(f.name, data_type_mapping[f.field_type]) for f in table.schema])
そして、先ほど定義したDataFlowOptions
クラスを使用してオプションを作成し、Apache Beamのパイプラインを定義します。このパイプラインの書き方がシェルを意識しているようで、かなり独特です。
Input: ReadTable
はDataflow側で表示されるステップ名となり、ここではBigQueryのテーブルを読み込んでいます。ReadFromBigQuery
には実に様々なオプションが用意されていますが、SQLを使用するのが一番融通が効くかなと思います。Output: Export to Parquet
では、pyarrowを用いたParquet出力を定義しており、先ほどマッピングしたスキーマ情報と出力先を指定します。
パイプラインを定義したら、run()
メソッドで非同期実行し、wait_until_finish()
を続けて実行することでジョブが完了するまで待機することができます。
# オプションの設定 options = DataFlowOptions(self.project_id, self.service_account_email, job_name, staging_location, temp_location) p = beam.Pipeline(options=options.options) # DataFlowのPipelineを定義 # Apache Beamは記法がシェルっぽく独特 ( p | 'Input: ReadTable' >> beam.io.gcp.bigquery.ReadFromBigQuery( query=query, use_standard_sql=True) | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet( file_path_prefix=output_location, schema=parquet_schema, file_name_suffix='.parquet') ) print('Run pipeline') print('Query: {}'.format(query)) result = p.run() result.wait_until_finish() print('Pipeline successfully finished')
Parquet出力の所感
ここまで実装するのに、正直かなり時間がかかりました。Apache Beamのクセが強く保守性があまり良くないので、今後BigQueryの機能としてParquet出力が登場したら、そっちに切り替えたいところですね。